#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <sys/mman.h>

#define DBG_SUBSYS S_LIBTASK

#include "adt.h"
#include "sysy_lib.h"
#include "lich_md.h"
#include "disk.h"
#include "../storage/md_parent.h"
#include "../../storage/controller/md_proto.h"
#include "nodectl.h"
#include "balance.h"
#include "configure.h"
#include "dbg.h"


#define BALANCE_IMMEDIATELY "balance/immediately"
#define BALANCE_THREAD      "balance/thread"
#define BALANCE_INTERVAL    "balance/interval"
#define BALANCE_SUSPEND     "balance/suspend"
#define BALANCE_STOP        "balance/stop"
#define BALANCE_DONE        "balance/done"

#define BALANCE_INFO   "balance/info"


static sem_t __sem__;
static time_t __last_post__;

static void __config_set(const char *key, const char *value) {
        int ret;
        ret = nodectl_set(key, value);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);
}

static int __config_get_int(const char *key, int _default) {
        return nodectl_get_int(key, _default);
        
}

static void __balance_get_thread(int *thread)
{
        *thread = __config_get_int(BALANCE_THREAD, 1);
}

static void __balance_get_stop(int *stop)
{
        *stop = __config_get_int(BALANCE_STOP, 0);
}

static void __balance_set_done()
{
        __config_set(BALANCE_DONE, "1");
}

static void __balance_set_init()
{
        __config_set(BALANCE_STOP, "0");
        __config_set(BALANCE_DONE, "0");
}

static const char *__balance_status2str(int status)
{
        if (status == __SCAN__)
                return "scan";
        else if (status == __RUNNING__)
                return "running";
        else if (status == __STOPPED__)
                return "stopped";
        else if (status == __FAILED__)
                return "failed";
        else if (status == __DONE__)
                return "done";
        else
                return "unknown";
}


static void __balance_dump(arg_t *arg, balance_status_t new_status)
{
        char value[MAX_BUF_LEN];
        
        if (new_status != __S_BAL_UNKNOWN__)
                arg->status = new_status;

        snprintf(value, MAX_PATH_LEN,
                 "status:%s\n"
                 "balance:%llu\n"
                 "success:%llu\n"
                 "fail:%llu\n",
                 __balance_status2str(arg->status),
                 (LLU)arg->balance,
                 (LLU)arg->success,
                 (LLU)arg->fail);
        __config_set(BALANCE_INFO, value);
}

/** handle a chunk
 *
 * @param res
 * @return
 */
static int __balance_thread_worker__(const res_t *res)
{
        int ret;
        chkid_t _chkid;
        fileid_t _parent;
        fileid_t *parent = &_parent;
        chkid_t *chkid = &_chkid;
        chkinfo_t *parent_chkinfo;
        char buf2[CHKINFO_MAX];
        nid_t nid;
        fileid_t srv;

        parent_chkinfo = (void *)buf2;
        _chkid = res->chkid;

        YASSERT(chkid->type == __RAW_CHUNK__);

        ret = md_parent_get(chkid, parent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = md_chunk_getinfo(NULL, parent, parent_chkinfo, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        /*DINFO("chkid: "CHKID_FORMAT", distnum: %d, disks: "NID_FORMAT", "NID_FORMAT"\n", CHKID_ARG(chkid), res->repnum, NID_ARG(&res->disks[0]),  NID_ARG(&res->disks[1]));*/

        // controller
        nid = parent_chkinfo->diskid[0].id;
        srv = *parent;
        ret = md_chunk_move(&nid, &srv, chkid, res->disks, res->repnum);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static void *__balance_thread_worker(void *arg)
{
        int ret, i;
        balance_seg_t *seg = arg;

        DINFO("start count %u\n", seg->count);

        for (i = 0; i < seg->count; i++) {
                if (seg->stop) {
                        DINFO("force exit\n");
                        break;
                }
                
                // chunk by chunk
                ret = __balance_thread_worker__(&seg->res[i]);
                if (ret == 0) {
                        seg->success++;
                        DBUG("success %u\n", seg->success);
                } else {
                        seg->fail++;
                        DBUG("fail %u\n", seg->fail);
                }
        }

        seg->stopped = 1;

        DINFO("exit\n");
        return NULL;
}

static int __balance_thread_create(balance_seg_t *seg)
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

        ret = pthread_create(&th, &ta, __balance_thread_worker, seg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static void __balance_worker_startup(arg_t *arg, balance_seg_t *_segs, int _thread, int *count)
{
        int ret, thread, left, step, i;
        balance_seg_t *seg;

        thread = THREAD_MAX < _thread ? THREAD_MAX : _thread;
        thread = arg->balance < 100 ? 1 : thread;
        
        left = arg->balance;
        step = (arg->balance + THREAD_MAX) / thread;

        DINFO("thread %u, left %u, step %u\n", thread, left, step);

        for (i = 0; i < thread; i++) {
                seg = &_segs[i];
                seg->res = arg->addr + (step * i * sizeof(res_t));
                seg->count = left < step ? left : step;
                seg->success = 0;
                seg->fail = 0;
                seg->running = 1;
                seg->stop = 0;
                seg->stopped = 0;

                // seg by seg
                ret = __balance_thread_create(seg);
                YASSERT(ret == 0);
                
                left -= seg->count;
        }

        *count = thread;
        arg->status = __RUNNING__;
}

static int __balance_suspend()
{
        time_t suspend, now;

        suspend = nodectl_get_int(BALANCE_SUSPEND, 0);
        now = gettime();
        
        if (suspend > now) {
                return suspend - now;
        } else
                return 0;
}

static  void __balance_stop(arg_t *arg, balance_seg_t *seg, int thread)
{
        int i, count, retry = 0;

        for (i = 0; i < thread; i++) {
                seg[i].stop = 1;
        }

        while (1) {
                count = 0;
                for (i = 0; i < thread; i++) {
                        if (seg[i].stopped)
                                count++;
                }

                if (count == thread) {
                        break;
                } else {
                        YASSERT(retry < 100);
                        DWARN("stop %u --> %u, retry %u\n", count, thread, retry);
                        retry++;
                        sleep(1);
                }
        }

        if (arg->fail)
                __balance_dump(arg, __FAILED__);
        else
                __balance_dump(arg, __STOPPED__);
}

static int __balance_done(arg_t *arg)
{
        YASSERT(arg->success == arg->balance);

        __balance_set_done();
        __balance_dump(arg, __DONE__);

        return 0;
}

static void __balance_scan__(void *_arg, void *_ent, void *_pool)
{
        int ret, retry=0;
        arg_t *arg = _arg;
        const chkid_t *chkid = _ent;
        chkinfo_t *chkinfo;
        char buf[CHKINFO_MAX], *pool = _pool;
        chkinfo = (void *)buf;

        if (chkid->type != __VOLUME_CHUNK__) {
                goto out;
        }

        ret = md_chunk_getinfo(pool, chkid, chkinfo, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (!net_islocal(CHKINFO_FIRSTNID(chkinfo))) {
                goto out;
        }

retry:
        ret = gen_jobs(pool, chkid, arg);
        if (unlikely(ret)) {
                if (ret == EAGAIN || ret == ENONET || ret == ENOSYS || ret == EBUSY) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                }

                GOTO(err_ret, ret);
        }

        DINFO("scan "CHKID_FORMAT"\n", CHKID_ARG(chkid));

out:
        return;
err_ret:
        DWARN("scan error: "CHKID_FORMAT", ret: %d\n", CHKID_ARG(chkid), ret);
        return;
}

static int __balance_scan(arg_t *arg)
{
        int fd;
        char path[MAX_PATH_LEN];

        sprintf(path, "/tmp/balance-XXXXXX");
        fd = mkstemp(path);
        unlink(path);

        arg->fd = fd;
        arg->balance = 0;
        arg->success = 0;
        arg->fail = 0;

        DINFO("begin to scan, path: %s, fd: %d, sql: %s\n", path, arg->fd, "medata");
        disk_maping->iterator("metadata", __balance_scan__, arg);
        DINFO("scan done\n");

        return 0;
/*err_ret:*/
        /*return ret;*/
}

static int __balance_worker_check(arg_t *arg, balance_seg_t *_segs, int thread)
{
        int stop, running, i, allstop;
        balance_seg_t *seg;

        arg->success = 0;
        arg->fail = 0;
        running = 0;
        for (i = 0; i < thread; i++) {
                seg = &_segs[i];
                arg->success += seg->success;
                arg->fail += seg->fail;
                running += seg->running;
        }

        DINFO("balance: %llu, success: %llu, fail: %llu, running: %u\n",
              (LLU)arg->balance, (LLU)arg->success, (LLU)arg->fail, running);

        __balance_dump(arg, __S_BAL_UNKNOWN__);

        YASSERT(arg->success + arg->fail <= arg->balance);
        if (arg->success == arg->balance) {
                __balance_done(arg);
                return 1;
        }

        allstop = 1;
        for (i = 0; i < thread; i++) {
                seg = &_segs[i];
                if (!seg->stopped) {
                        allstop = 0;
                }
        }

        if (allstop) {
                DINFO("stop...\n");
                __balance_stop(arg, _segs, thread);
                return 1;
        }

        YASSERT(running);

        __balance_get_stop(&stop);
        if (stop) {
                DINFO("stop...\n");
                __balance_stop(arg, _segs, thread);
                return 1;
        }

        return 0;
}

static int __balance(void *_arg)
{
        int ret, thread, _thread, _thread_new;
        const chkid_t *chkid;
        arg_t arg;
        balance_seg_t _segs[THREAD_MAX];

        (void) _arg;
        (void) chkid;

        ret = __balance_scan(&arg);
        if (unlikely(ret)) {
                //todo 处理ret
                GOTO(err_ret, ret);
                /*YASSERT(0);*/
        }

        if (arg.balance == 0) {
                DINFO("node clean\n");
                __balance_done(&arg);
                goto out;
        }

        DINFO("need balance %lu\n", arg.balance);

        arg.addr = mmap(NULL, arg.balance * sizeof(res_t), PROT_READ, MAP_PRIVATE, arg.fd, 0);
        if (arg.addr == MAP_FAILED) {
                ret = errno;
                UNIMPLEMENTED(__DUMP__);
        }

retry:
        __balance_get_thread(&_thread);
        __balance_worker_startup(&arg, _segs, _thread, &thread);

        while (1) {
                sleep(1);

                ret = __balance_worker_check(&arg, _segs, thread);
                if (unlikely(ret))
                        break;

                __balance_get_thread(&_thread_new);
                if (_thread != _thread_new) {
                        DINFO("interrupted by new thread argument\n");
                        __balance_stop(&arg, _segs, thread);
                        goto retry;
                }
        }

        munmap(arg.addr, arg.balance * sizeof(res_t));
        close(arg.fd);

out:
        return 0;
err_ret:
        return ret;
}

static void __balance_sleep(arg_t *arg)
{
        int ret, interval;

        interval = nodectl_get_int(BALANCE_INTERVAL, 60); // 28800=60*60*8, default value 8 hours

        DINFO("wait %d seconds\n", interval);

        __balance_dump(arg, __WAITING__);

        ret = _sem_timedwait1(&__sem__, interval);
        if (unlikely(ret)) {
                if (ret == ETIMEDOUT) {
                } else
                        UNIMPLEMENTED(__DUMP__);
        }

        while (1) {
                ret = __balance_suspend();
                if (unlikely(ret)) {
                        __balance_dump(arg, __SUSPEND__);
                        DINFO("balance suspend left %u\n", ret);

                        ret = _sem_timedwait1(&__sem__, ret < 10 ? ret:10);
                        if (unlikely(ret)) {
                                if (ret == ETIMEDOUT) {
                                        continue;
                                } else
                                        UNIMPLEMENTED(__DUMP__);
                        }
                }

                break;
        }
}

static void *__balance_worker(void *_arg)
{
        int ret;
        arg_t arg;

        (void) _arg;

        while (1) {
                __balance_sleep(&arg);
        retry:
                ret = __balance(&arg);
                if (unlikely(ret))
                        goto retry;
        }

        return NULL;
}

static void __balance_mediately()
{
        int immediately;
        
        immediately = __config_get_int(BALANCE_IMMEDIATELY, 0);
        if (immediately == 0) {
                return;
        }
        
        __config_set(BALANCE_IMMEDIATELY, "0");
}

static int __balance_immediately(void *context, uint32_t mask)
{
        int immediately;
        
        (void) context;
        (void) mask;
        
        immediately = __config_get_int(BALANCE_IMMEDIATELY, 0);
        if (immediately == 0) {
                return 0;
        }

        DINFO("immediately\n");

        time_t now = gettime();
        if (now - __last_post__ > 3) {
                __last_post__ = now;
                sem_post(&__sem__);
        } else {
                __balance_mediately();
        }

        return 0;
}

static int __balance_immediately_reset(void *context, uint32_t mask)
{
        int ret;

        (void) context;
        (void) mask;
        
        DINFO("immediately_reset\n");

        ret = nodectl_register(BALANCE_IMMEDIATELY, "0", __balance_immediately, __balance_immediately_reset, NULL);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        return 0;
}

int balance_init()
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;

        ret = sem_init(&__sem__, 0, 0);
        if (ret < 0) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        __balance_set_init();

        ret = nodectl_register(BALANCE_IMMEDIATELY, "0", __balance_immediately, __balance_immediately_reset, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

        ret = pthread_create(&th, &ta, __balance_worker, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}
